{
  "cells": [
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "Copyright (c) Microsoft Corporation. All rights reserved. \n",
        "\n",
        "Licensed under the MIT License."
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "![Impressions](https://PixelServer20190423114238.azurewebsites.net/api/impressions/MachineLearningNotebooks/how-to-use-azureml/azure-arcadia/spark_job_on_synapse_spark_pool.png)"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "# Using Synapse Spark Pool as a Compute Target from Azure Machine Learning Remote Run\n",
        "1. To use Synapse Spark Pool as a compute target from Experiment Run, [ScriptRunConfig](https://docs.microsoft.com/en-us/python/api/azureml-core/azureml.core.script_run_config.scriptrunconfig?view=azure-ml-py) is used, the same as other Experiment Runs. This notebook demonstrates how to leverage ScriptRunConfig to submit an experiment run to an attached Synapse Spark cluster.\n",
        "2. To use Synapse Spark Pool as a compute target from [Azure Machine Learning Pipeline](https://aka.ms/pl-concept), a [SynapseSparkStep](https://docs.microsoft.com/en-us/python/api/azureml-pipeline-steps/azureml.pipeline.steps.synapse_spark_step.synapsesparkstep?view=azure-ml-py) is used. This notebook demonstrates how to leverage SynapseSparkStep in Azure Machine Learning Pipeline.\n",
        "\n",
        "## Before you begin:\n",
        "1. **Create an Azure Synapse workspace**, check [this] (https://docs.microsoft.com/en-us/azure/synapse-analytics/quickstart-create-workspace) for more information.\n",
        "2. **Create Spark Pool in Synapse workspace**: check [this] (https://docs.microsoft.com/en-us/azure/synapse-analytics/quickstart-create-apache-spark-pool-portal) for more information."
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "# Azure Machine Learning and Pipeline SDK-specific imports"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "import os\n",
        "import azureml.core\n",
        "from azureml.core import Workspace, Experiment\n",
        "from azureml.core import LinkedService, SynapseWorkspaceLinkedServiceConfiguration\n",
        "from azureml.core.compute import ComputeTarget, AmlCompute, SynapseCompute\n",
        "from azureml.exceptions import ComputeTargetException\n",
        "from azureml.data import HDFSOutputDatasetConfig\n",
        "from azureml.core.datastore import Datastore\n",
        "from azureml.core.runconfig import RunConfiguration\n",
        "from azureml.core.conda_dependencies import CondaDependencies\n",
        "from azureml.pipeline.core import Pipeline\n",
        "from azureml.pipeline.steps import PythonScriptStep, SynapseSparkStep\n",
        "\n",
        "# Check core SDK version number\n",
        "print(\"SDK version:\", azureml.core.VERSION)"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "ws = Workspace.from_config()\n",
        "print(ws.name, ws.resource_group, ws.location, ws.subscription_id, sep = '\\n')"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "# Link Synapse workspace to AML \n",
        "You have to be an \"Owner\" of Synapse workspace resource to perform linking. You can check your role in the Azure resource management portal, if you don't have an \"Owner\" role, you can contact an \"Owner\" to link the workspaces for you."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "\n",
        "# Replace with your resource info before running.\n",
        "\n",
        "synapse_subscription_id=os.getenv(\"SYNAPSE_SUBSCRIPTION_ID\", \"<my-synapse-subscription-id>\")\n",
        "synapse_resource_group=os.getenv(\"SYNAPSE_RESOURCE_GROUP\", \"<my-synapse-resource-group>\")\n",
        "synapse_workspace_name=os.getenv(\"SYNAPSE_WORKSPACE_NAME\", \"<my-synapse-workspace-name>\")\n",
        "synapse_linked_service_name=os.getenv(\"SYNAPSE_LINKED_SERVICE_NAME\", \"<my-synapse-linked-service-name>\")\n",
        "\n",
        "synapse_link_config = SynapseWorkspaceLinkedServiceConfiguration(\n",
        "    subscription_id=synapse_subscription_id,\n",
        "    resource_group=synapse_resource_group,\n",
        "    name=synapse_workspace_name\n",
        ")\n",
        "\n",
        "linked_service = LinkedService.register(\n",
        "    workspace=ws,\n",
        "    name=synapse_linked_service_name,\n",
        "    linked_service_config=synapse_link_config)"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "# Linked service property\n",
        "\n",
        "A MSI (system_assigned_identity_principal_id) will be generated for each linked service, for example:\n",
        "\n",
        "name=synapselink,</p>\n",
        "type=Synapse, </p>\n",
        "linked_service_resource_id=/subscriptions/4faaaf21-663f-4391-96fd-47197c630979/resourceGroups/static_resources_synapse_test/providers/Microsoft.Synapse/workspaces/synapsetest2, </p>\n",
        "system_assigned_identity_principal_id=eb355d52-3806-4c5a-aec9-91447e8cfc2e </p>\n",
        "\n",
        "#### Make sure you grant \"Synapse Apache Spark Administrator\" role of the synapse workspace to the generated workspace linking MSI in Synapse studio portal before you submit job."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "linked_service"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "LinkedService.list(ws)"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "# Attach Synapse spark pool as AML compute target"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "synapse_spark_pool_name=os.getenv(\"SYNAPSE_SPARK_POOL_NAME\", \"<my-synapse-spark-pool-name>\")\n",
        "synapse_compute_name=os.getenv(\"SYNAPSE_COMPUTE_NAME\", \"<my-synapse-compute-name>\")\n",
        "\n",
        "attach_config = SynapseCompute.attach_configuration(\n",
        "        linked_service,\n",
        "        type=\"SynapseSpark\",\n",
        "        pool_name=synapse_spark_pool_name)\n",
        "\n",
        "synapse_compute=ComputeTarget.attach(\n",
        "        workspace=ws,\n",
        "        name=synapse_compute_name,\n",
        "        attach_configuration=attach_config)\n",
        "\n",
        "synapse_compute.wait_for_completion()"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "# Start an experiment run"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "## Prepare data"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "# Use the default blob storage\n",
        "def_blob_store = Datastore(ws, \"workspaceblobstore\")\n",
        "print('Datastore {} will be used'.format(def_blob_store.name))\n",
        "\n",
        "# We are uploading a sample file in the local directory to be used as a datasource\n",
        "file_name = \"Titanic.csv\"\n",
        "def_blob_store.upload_files(files=[\"./{}\".format(file_name)], overwrite=False)\n",
        " "
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "## Tabular dataset as input"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "from azureml.core import Dataset\n",
        "titanic_tabular_dataset = Dataset.Tabular.from_delimited_files(path=[(def_blob_store, file_name)])\n",
        "input1 = titanic_tabular_dataset.as_named_input(\"tabular_input\")"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "## File dataset as input"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "from azureml.core import Dataset\n",
        "titanic_file_dataset = Dataset.File.from_files(path=[(def_blob_store, file_name)])\n",
        "input2 = titanic_file_dataset.as_named_input(\"file_input\").as_hdfs()"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "## Output config: the output will be registered as a File dataset\n",
        "\n"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "from azureml.data import HDFSOutputDatasetConfig\n",
        "output = HDFSOutputDatasetConfig(destination=(def_blob_store,\"test\")).register_on_complete(name=\"registered_dataset\")"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "## Dataprep script"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "os.makedirs(\"code\", exist_ok=True)"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "%%writefile code/dataprep.py\n",
        "import os\n",
        "import sys\n",
        "import azureml.core\n",
        "from pyspark.sql import SparkSession\n",
        "from azureml.core import Run, Dataset\n",
        "\n",
        "print(azureml.core.VERSION)\n",
        "print(os.environ)\n",
        "\n",
        "import argparse\n",
        "parser = argparse.ArgumentParser()\n",
        "parser.add_argument(\"--tabular_input\")\n",
        "parser.add_argument(\"--file_input\")\n",
        "parser.add_argument(\"--output_dir\")\n",
        "args = parser.parse_args()\n",
        "\n",
        "# use dataset sdk to read tabular dataset\n",
        "run_context = Run.get_context()\n",
        "dataset = Dataset.get_by_id(run_context.experiment.workspace,id=args.tabular_input)\n",
        "sdf = dataset.to_spark_dataframe()\n",
        "sdf.show()\n",
        "\n",
        "# use hdfs path to read file dataset\n",
        "spark= SparkSession.builder.getOrCreate()\n",
        "sdf = spark.read.option(\"header\", \"true\").csv(args.file_input)\n",
        "sdf.show()\n",
        "\n",
        "sdf.coalesce(1).write\\\n",
        ".option(\"header\", \"true\")\\\n",
        ".mode(\"append\")\\\n",
        ".csv(args.output_dir)"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "## Set up Conda dependency for the following Script Run"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "from azureml.core.environment import CondaDependencies\n",
        "conda_dep = CondaDependencies()\n",
        "conda_dep.add_pip_package(\"azureml-core==1.20.0\")"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "## How to leverage ScriptRunConfig to submit an experiment run to an attached Synapse Spark cluster"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "from azureml.core import RunConfiguration\n",
        "from azureml.core import ScriptRunConfig \n",
        "from azureml.core import Experiment\n",
        "\n",
        "run_config = RunConfiguration(framework=\"pyspark\")\n",
        "run_config.target = synapse_compute_name\n",
        "\n",
        "run_config.spark.configuration[\"spark.driver.memory\"] = \"1g\" \n",
        "run_config.spark.configuration[\"spark.driver.cores\"] = 2 \n",
        "run_config.spark.configuration[\"spark.executor.memory\"] = \"1g\" \n",
        "run_config.spark.configuration[\"spark.executor.cores\"] = 1 \n",
        "run_config.spark.configuration[\"spark.executor.instances\"] = 1 \n",
        "\n",
        "run_config.environment.python.conda_dependencies = conda_dep\n",
        "\n",
        "script_run_config = ScriptRunConfig(source_directory = './code',\n",
        "                                    script= 'dataprep.py',\n",
        "                                    arguments = [\"--tabular_input\", input1, \n",
        "                                                 \"--file_input\", input2,\n",
        "                                                 \"--output_dir\", output],\n",
        "                                    run_config = run_config) "
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "from azureml.core import Experiment \n",
        "exp = Experiment(workspace=ws, name=\"synapse-spark\") \n",
        "run = exp.submit(config=script_run_config) \n",
        "run"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "## How to leverage SynapseSparkStep in an AML pipeline to orchestrate data prep step on Synapse Spark and training step on AzureML compute."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "# Choose a name for your CPU cluster\n",
        "cpu_cluster_name = \"cpucluster\"\n",
        "\n",
        "# Verify that cluster does not exist already\n",
        "try:\n",
        "    cpu_cluster = ComputeTarget(workspace=ws, name=cpu_cluster_name)\n",
        "    print('Found existing cluster, use it.')\n",
        "except ComputeTargetException:\n",
        "    compute_config = AmlCompute.provisioning_configuration(vm_size='STANDARD_D2_V2',\n",
        "                                                           max_nodes=1)\n",
        "    cpu_cluster = ComputeTarget.create(ws, cpu_cluster_name, compute_config)\n",
        "\n",
        "cpu_cluster.wait_for_completion(show_output=True)"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "%%writefile code/train.py\n",
        "import glob\n",
        "import os\n",
        "import sys\n",
        "from os import listdir\n",
        "from os.path import isfile, join\n",
        "\n",
        "mypath = os.environ[\"step2_input\"]\n",
        "files = [f for f in listdir(mypath) if isfile(join(mypath, f))]\n",
        "for file in files:\n",
        "    with open(join(mypath,file)) as f:\n",
        "        print(f.read())"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "titanic_tabular_dataset = Dataset.Tabular.from_delimited_files(path=[(def_blob_store, file_name)])\n",
        "titanic_file_dataset = Dataset.File.from_files(path=[(def_blob_store, file_name)])\n",
        "\n",
        "step1_input1 = titanic_tabular_dataset.as_named_input(\"tabular_input\")\n",
        "step1_input2 = titanic_file_dataset.as_named_input(\"file_input\").as_hdfs()\n",
        "step1_output = HDFSOutputDatasetConfig(destination=(def_blob_store,\"test\")).register_on_complete(name=\"registered_dataset\")\n",
        "\n",
        "step2_input = step1_output.as_input(\"step2_input\").as_download()\n",
        "\n",
        "\n",
        "from azureml.core.environment import Environment\n",
        "env = Environment(name=\"myenv\")\n",
        "env.python.conda_dependencies.add_pip_package(\"azureml-core==1.20.0\")\n",
        "\n",
        "step_1 = SynapseSparkStep(name = 'synapse-spark',\n",
        "                          file = 'dataprep.py',\n",
        "                          source_directory=\"./code\", \n",
        "                          inputs=[step1_input1, step1_input2],\n",
        "                          outputs=[step1_output],\n",
        "                          arguments = [\"--tabular_input\", step1_input1, \n",
        "                                       \"--file_input\", step1_input2,\n",
        "                                       \"--output_dir\", step1_output],\n",
        "                          compute_target = synapse_compute_name,\n",
        "                          driver_memory = \"7g\",\n",
        "                          driver_cores = 4,\n",
        "                          executor_memory = \"7g\",\n",
        "                          executor_cores = 2,\n",
        "                          num_executors = 1,\n",
        "                          environment = env)\n",
        "\n",
        "step_2 = PythonScriptStep(script_name=\"train.py\",\n",
        "                          arguments=[step2_input],\n",
        "                          inputs=[step2_input],\n",
        "                          compute_target=cpu_cluster_name,\n",
        "                          source_directory=\"./code\",\n",
        "                          allow_reuse=False)\n",
        "\n",
        "pipeline = Pipeline(workspace=ws, steps=[step_1, step_2])\n",
        "pipeline_run = pipeline.submit('synapse-pipeline', regenerate_outputs=True)"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": []
    }
  ],
  "metadata": {
    "authors": [
      {
        "name": "yunzhan"
      }
    ],
    "kernelspec": {
      "display_name": "Python 3.8 - AzureML",
      "language": "python",
      "name": "python38-azureml"
    },
    "language_info": {
      "codemirror_mode": {
        "name": "ipython",
        "version": 3
      },
      "file_extension": ".py",
      "mimetype": "text/x-python",
      "name": "python",
      "nbconvert_exporter": "python",
      "pygments_lexer": "ipython3",
      "version": "3.6.7"
    },
    "nteract": {
      "version": "0.28.0"
    }
  },
  "nbformat": 4,
  "nbformat_minor": 2
}